package cn.zhaopin.starter.mq.core;

import cn.zhaopin.starter.mq.common.PulsarMessage;
import cn.zhaopin.starter.mq.properties.PulsarProperties;
import cn.zhaopin.starter.mq.support.JSON2Schema;
import cn.zhaopin.starter.mq.support.SnowFlakeIdGenerator;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;

/**
 * Description: tdmq 生产者 构建
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/19-10:37
 */
public class PulsarProducerFactory {

    private static Logger log = LoggerFactory.getLogger(PulsarProducerFactory.class);

    private final PulsarClient client;
    private final Environment environment;
    private PulsarProperties pulsarProperties;

    private final SnowFlakeIdGenerator idGenerator = new SnowFlakeIdGenerator(10, 10);

    private final PulsarTopicFactory topicFactory;


    /** 生产者名称 格式 : DEFAULT_PRODUCER_{env}_{topoc} */
    private final String PRODUCER_NAME_PATTERN = "DEFAULT_PRODUCER_%s_%s";

    private PulsarProducerFactory(){
        throw new AssertionError("Not Support");
    }

    public PulsarProducerFactory(PulsarClient client, Environment environment, PulsarProperties pulsarProperties) {
        this.client = client;
        this.environment = environment;
        this.pulsarProperties = pulsarProperties;

        topicFactory = new PulsarTopicFactory(pulsarProperties);
    }

    /**
     * <p>simple producer for send sync or send async</p>
     *
     * @param topic topic
     * @return org.apache.pulsar.client.api.Producer producer
     */
    public Producer<PulsarMessage> create(String topic) {
        try {
            return client
                    .newProducer(JSON2Schema.of(PulsarMessage.class))
                    .producerName(getProducerName(topic) + "_" + idGenerator.generate())
                    .topic(topicFactory.obtainFullTopic(topic))
                    .create();
        } catch (PulsarClientException e) {
            log.error("Pulsar create producer error , topic {}", topic);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    /**
     * get producer name
     *
     * @param topic topic
     * @return java.lang.String producer name
     */
    public String getProducerName(String topic) {
        String appEnv = environment.getProperty("app.env");
        appEnv = StringUtils.hasLength(appEnv) ? appEnv : "dev";
        return String.format(PRODUCER_NAME_PATTERN, appEnv, topic);
    }

    public PulsarTopicFactory getTopicFactory() {
        return topicFactory;
    }
}
